{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Register TSV Data With Athena\n",
    "This will create an Athena table in the Glue Catalog (Hive Metastore)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have a database, we’re ready to create a table that’s based on the `Amazon Customer Reviews Dataset`. We define the columns that map to the data, specify how the data is delimited, and provide the location in Amazon S3 for the file(s). \n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/athena_register_tsv.png\" width=\"60%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ingest_create_athena_table_tsv_passed = False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r ingest_create_athena_db_passed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    ingest_create_athena_db_passed\n",
    "except NameError:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not create the Athena Database.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(ingest_create_athena_db_passed)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not ingest_create_athena_db_passed:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not create the Athena Database.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "else:\n",
    "    print(\"[OK]\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r s3_private_path_tsv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    s3_private_path_tsv\n",
    "except NameError:\n",
    "    print(\"*****************************************************************************\")\n",
    "    print(\"[ERROR] PLEASE RE-RUN THE PREVIOUS COPY TSV TO S3 NOTEBOOK ******************\")\n",
    "    print(\"[ERROR] THIS NOTEBOOK WILL NOT RUN PROPERLY. ********************************\")\n",
    "    print(\"*****************************************************************************\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(s3_private_path_tsv)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Import PyAthena"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyathena import connect"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Athena Table from Local TSV Files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Dataset columns\n",
    "\n",
    "- `marketplace`: 2-letter country code (in this case all \"US\").\n",
    "- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.\n",
    "- `review_id`: A unique ID for the review.\n",
    "- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.\n",
    "- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.\n",
    "- `product_title`: Title description of the product.\n",
    "- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).\n",
    "- `star_rating`: The review's rating (1 to 5 stars).\n",
    "- `helpful_votes`: Number of helpful votes for the review.\n",
    "- `total_votes`: Number of total votes the review received.\n",
    "- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?\n",
    "- `verified_purchase`: Was the review from a verified purchase?\n",
    "- `review_headline`: The title of the review itself.\n",
    "- `review_body`: The text of the review.\n",
    "- `review_date`: The date the review was written.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set S3 staging directory -- this is a temporary directory used for Athena queries\n",
    "s3_staging_dir = \"s3://{0}/athena/staging\".format(bucket)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set Athena parameters\n",
    "database_name = \"dsoaws\"\n",
    "table_name_tsv = \"amazon_reviews_tsv\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# SQL statement to execute\n",
    "statement = \"\"\"CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(\n",
    "         marketplace string,\n",
    "         customer_id string,\n",
    "         review_id string,\n",
    "         product_id string,\n",
    "         product_parent string,\n",
    "         product_title string,\n",
    "         product_category string,\n",
    "         star_rating int,\n",
    "         helpful_votes int,\n",
    "         total_votes int,\n",
    "         vine string,\n",
    "         verified_purchase string,\n",
    "         review_headline string,\n",
    "         review_body string,\n",
    "         review_date string\n",
    ") ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\\\t' LINES TERMINATED BY '\\\\n' LOCATION '{}'\n",
    "TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')\"\"\".format(\n",
    "    database_name, table_name_tsv, s3_private_path_tsv\n",
    ")\n",
    "\n",
    "print(statement)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "pd.read_sql(statement, conn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Verify The Table Has Been Created Succesfully"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "statement = \"SHOW TABLES in {}\".format(database_name)\n",
    "\n",
    "df_show = pd.read_sql(statement, conn)\n",
    "df_show.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if table_name_tsv in df_show.values:\n",
    "    ingest_create_athena_table_tsv_passed = True"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store ingest_create_athena_table_tsv_passed"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run A Sample Query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "product_category = \"Digital_Software\"\n",
    "\n",
    "statement = \"\"\"SELECT * FROM {}.{}\n",
    "    WHERE product_category = '{}' LIMIT 100\"\"\".format(\n",
    "    database_name, table_name_tsv, product_category\n",
    ")\n",
    "\n",
    "print(statement)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_sql(statement, conn)\n",
    "df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not df.empty:\n",
    "    print(\"[OK]\")\n",
    "else:\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] YOUR DATA HAS NOT BEEN REGISTERED WITH ATHENA. LOOK IN PREVIOUS CELLS TO FIND THE ISSUE.\")\n",
    "    print(\"++++++++++++++++++++++++++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review the New Athena Table in the Glue Catalog"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/glue/home?region={}#\">AWS Glue Catalog</a></b>'.format(\n",
    "            region\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Store Variables for the Next Notebooks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
